0%

This article is a translation by ChatGPT4o, check this out if you read Chinese.

TL;DR

When you spawn processes with multiprocessing.Process and select fork as the start method, there are additional operations performed besides just invoking os.fork, such as invoking some after-fork hooks registered by other objects. You can’t trigger these hooks if using os.fork directly, potentially leading to errors.

Introduction

Recently, I dived a little bit into Python’s multiprocessing module and was impressed by the limitation of it.

I often heard that for multi-process programming, you should not use os.fork directly but use multiprocessing.Process instead. I never really understood why though. But some popular projects, such as Gunicorn, use os.fork to spawn child processes.

I got some insights when solving this problem, which merely through exceptions during cleanup and does not impact the runtime.

However, during sick leave, I looked through issues on CPython repository tagged with “expert-multiprocessing” and found an interesting issue, where using correct method to start subprocesses or not significantly impacts runtime.

The Problem

In this issue, the author was trying to

  1. create a Manager object in the main process;
  2. create a dict in the Manager process which has a corresponding DictProxy object in the main process and is initialized;
  3. manually serialize the DictProxy;
  4. fork a child process and restore the DictProxy in the child process;
  5. access to the dictionary concurrently from both processes.

Here’s the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import os
from multiprocessing.managers import SyncManager

if __name__ == '__main__':
manager = SyncManager(authkey=b'test')
manager.start()
address = manager.address
d = manager.dict()
pickled_dict = d.__reduce__()
pickled_dict[1][-1]["authkey"] = b"test"
print(pickled_dict)
for i in range(1000):
d[i] = i

child_id = os.fork()

if child_id != 0:
# in parent process
# do something on the proxy object forever
i = 0
while True:
d[i % 1000] = i % 3434
i += 1
else:
# in child process

# connect to manager process
child_manager = SyncManager(address=address, authkey=b'test')
child_manager.connect()

# rebuild the dictionary proxy
proxy_obj = pickled_dict[0](*pickled_dict[1])

# read the proxy object forever
while True:
print(list(proxy_obj.values())[:10])

Running this code throws an exception: _pickle.UnpicklingError: invalid load key, '\x0a'.

The author also provided another code snippet, which didn’t use the standard dict data type, instead, he registered a custom class to the manager and used locks in the member methods of this class. Then the code worked. At the beginning, this information seems useful, but it’s proved to be quite confusing later.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
from multiprocessing.managers import SyncManager
from threading import RLock

class SyncedDictionary:
def __init__(self):
# store the data in the instance
self.data = {}
self.lock = RLock()
print(f"init from {os.getpid()}")

def add(self, k, v):
with self.lock:
self.data[k] = v

def get_values(self):
with self.lock:
return list(self.data.values())

if __name__ == '__main__':
# custom class

SyncManager.register("custom", SyncedDictionary)
manager = SyncManager(authkey=b'test')
manager.start()
address = manager.address

print(f"from main pid {os.getpid()}")
custom_dict = manager.custom()
pickled_dict = custom_dict.__reduce__()
pickled_dict[1][-1]["authkey"] = b"test"
print(pickled_dict)
child_id = os.fork()

if child_id != 0:
# in parent, do work on the proxy object forever
i = 0
while True:
custom_dict.add(i % 1000, i % 3434)
i += 1
else:

for i in range(3):
os.fork() # even more child processes...

print(os.getpid())
# in children
# connect to manager process
child_manager = SyncManager(address=address, authkey=b'test')
child_manager.connect()

# rebuild the dictionary proxy
proxy_obj = pickled_dict[0](*pickled_dict[1])
# read on the proxy object forever
while True:
list(proxy_obj.get_values())[:10]

Background

The Manager

In the multiprocessing module, users can create a Manager object in the main process, which will starts a new process(called the manager process), and keeps a proxy object in the main process.

The proxy communicates with the manager process via sockets (It’s likely using pipes on Windows, but I haven’t looked into it). Objects created through this proxy are actually created in the manager process. The proxy sends instructions to the manager process to control operations like creating objects and calling member methods.

Other processes can access this object by creating a proxy and connecting to the socket address the manager process is listening on. This design makes it convenient to share objects between different processes.

Register Classes with Manager

The standard library has registered some classes ahead, such as dict and Array. Custom classes can also be registered using the register method.

For more details, see the multiprocessing — Process-based parallelism documentation, or better yet, check the source code: multiprocessing/managers.py.

Conflicted Communication Stream

Based on the exception message, and after debugging, I found that the communication data streams of two processes with the manager process might be mixed, which failed the decoding.

As mentioned above, the proxy in the main process communicates with the manager process through socket to call methods. But how is this communication created?

The BaseProxy._callmethod method, which is used by the proxy to call methods, will first check if there’s an existing connection in its TLS (Thread-Local Storage). If so, just reuse it; otherwise, it will invoke BaseProxy._connect to establish a new connection:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _connect(self):
util.debug('making connection to manager')
name = process.current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
conn = self._Client(self._token.address, authkey=self._authkey)
dispatch(conn, None, 'accept_connection', (name,))
self._tls.connection = conn

def _callmethod(self, methodname, args=(), kwds={}):
'''
Try to call a method of the referent and return a copy of the result
'''
try:
conn = self._tls.connection
except AttributeError:
util.debug('thread %r does not own a connection',
threading.current_thread().name)
self._connect()
conn = self._tls.connection

conn.send((self._id, methodname, args, kwds))
kind, result = conn.recv()

This describes how to reuse the connection in TLS:

1
2
3
4
5
6
7
def __init__(self, token, serializer, manager=None, authkey=None, exposed=None, incref=True, manager_owned=False):
with BaseProxy._mutex:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
BaseProxy._address_to_local[token.address] = tls_idset
self._tls = tls_idset[0]

We can see that self._tls is of type util.ForkAwareLocal, a subclass of threading.local, which is stored in TLS and defined as:

1
2
3
4
5
6
class ForkAwareLocal(threading.local):
def __init__(self):
register_after_fork(self, lambda obj : obj.__dict__.clear())

def __reduce__(self):
return type(self), ()

During forking, the TLS data in parent process is copied to the child process. If a connection has already been created before forking, both parent and child processes will communicate through the same socket, which causes conflicts.

Why multiprocessing.Process works fine?

How does multiprocessing.Process avoid this problem?

Through the definition of ForkAwareLocal, we can see that it registers a lambda function (lambda obj: obj.__dict__.clear()) via register_after_fork in its constructor. This lambda function clears the object’s __dict__, which stores attributes.

register_after_fork registers functions to a global registry named _afterfork_registry. And these functions are called sequentially in _run_after_forkers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
_afterfork_registry = weakref.WeakValueDictionary()
_afterfork_counter = itertools.count()

def _run_after_forkers():
items = list(_afterfork_registry.items())
items.sort()
for (index, ident, func), obj in items:
try:
func(obj)
except Exception as e:
info('after forker raised exception %s', e)

def register_after_fork(obj, func):
_afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj

To summarize, as long as we trigger the hook function registered by ForkAwareLocal(the lambda function) by calling _run_after_forkers, we can clear the attributes in self._tls. Then when we invoke _callmethod, it will raise AttributeError and prompt the proxy to create a new connection to the manager process. Then there will be no conflict since processes no long use the same connection now.

But who calls this _run_after_forkers? A global search indicates that it’s in BaseProcess._after_fork. And BaseProcess is the superclass of multiprocessing.Process.

1
2
3
4
5
@staticmethod
def _after_fork():
from . import util
util._finalizer_registry.clear()
util._run_after_forkers()

If interested, you can dive deep. The entire invoke chain is:

  • multiprocessing.context.Process.start
  • multiprocessing.process.BaseProcess.start
  • multiprocessing.context.Process._Popen
  • multiprocessing.context.ForkProcess._Popen
  • multiprocessing.popen_fork.Popen._launch
  • multiprocessing.process.BaseProcess._bootstrap
  • multiprocessing.process.BaseProcess.after_fork

Only when you use multiprocessing.Process to spawn a process, the attributes in self._tls can be cleared by the hook and conflicts can be avoided.

Solutions

Knowing the cause, I prompted several solutions (all based on the author’s first code snippet in the Github issue):

1. Manually update the connection

1
2
proxy_obj = pickled_dict[0](*pickled_dict[1])
proxy_obj._connect() # create new conn

2. Manually call the hook function

1
2
3
4
5
6
7
from multiprocessing.util import _run_after_forkers

pid = os.fork()

if pid == 0:
# in child
_run_after_forkers()

Use multiprocessing.Process instead of os.fork

Just follow the documents. Note that the proxy objects can be passed to Process as parameters.

Debugging Tips

You can use this snippet to set the multiprocessing internal logger’s level to debug to print more information without modifying the source code:

1
2
3
4
import logging
from multiprocessing.util import log_to_stderr

log_to_stderr(logging.DEBUG)

PS

Why did I say the author’s second example is somehow confusing?

Because the essential difference between the two code snippets is not that one uses a built-in type and the other a custom type. The key is that the first assigns values to the shared object before forking which generates a connection in TLS. The second does not assign values before forking. If you add an assignment before forking in the second snippet, you will get the same result:

1
2
for i in range(1000):
custom_dict.add(i, i)

Conclusion

Multi-process programming in Python is not a very pleasant thing to do, due to a lack of detailed documents and you have to dive into source code. Moreover, in order to avoid resource leaks, the multiprocessing module has many built-in hooks and additional processes to manage resources, which weakens developers’ abilities to manage them precisely.

This article is a translation by ChatGPT4o, check this out if you read Chinese.

A few days ago, while reading about JARM, a novel TLS server fingerprinting tool proposed by Salesforce, I noticed they used a choose_grease() function when constructing the TLS ClientHello record, which drove me to look into this GREASE mechanism.

1
2
3
4
# Randomly choose a grease value
def choose_grease():
grease_list = [b"\x0a\x0a", b"\x1a\x1a", b"\x2a\x2a", b"\x3a\x3a", b"\x4a\x4a", b"\x5a\x5a", b"\x6a\x6a", b"\x7a\x7a", b"\x8a\x8a", b"\x9a\x9a", b"\xaa\xaa", b"\xba\xba", b"\xca\xca", b"\xda\xda", b"\xea\xea", b"\xfa\xfa"]
return random.choice(grease_list)

What is GREASE?

GREASE stands for Generate Random Extensions And Sustain Extensibility, which was formally defined in RFC8701. It’s a special mechanism to mitigate the difficulty in extending the TLS protocol in the future.

For those familiar with TLS, you know that during the handshake phase, the client first sends a ClientHello record to the server, which includes supported TLS versions, CipherSuites types, and some extensions. If the server can process this, it returns a ServerHello record with the chosen CipherSuite and some extensions. For extensibility, some values for cipher and extension fields are reserved for future use. For example, TLSv1.2 introduced AEAD-type ciphers.

Similarly, GREASE limits certain reserved values for these parameters, called GREASE values, which currently hold no meaning in the TLS protocol. For instance, GREASE values for CipherSuites and the Application-Layer Protocol Negotiation (ALPN) extension include:

1
{0x0A,0x0A}, {0x1A,0x1A}, {0x2A,0x2A}, {0x3A,0x3A}, {0x4A,0x4A}, {0x5A,0x5A}, {0x6A,0x6A}, {0x7A,0x7A}, {0x8A,0x8A}, {0x9A,0x9A}, {0xAA,0xAA}, {0xBA,0xBA}, {0xCA,0xCA}, {0xDA,0xDA}, {0xEA,0xEA}, {0xFA,0xFA}

The protocol specifies that when the client sends a ClientHello, it may select one or more GREASE values to be sent to the server.

A client MAY select one or more GREASE cipher suite values and advertise them in the “cipher_suites” field.

If the client receives a record from the server (e.g. ServerHello, Certificate, EncryptedExtensions, etc.) containing a GREASE value, it must reject the record and close the connection.

Clients MUST reject GREASE values when negotiated by the server. In particular, the client MUST fail the connection if a GREASE value appears in any of the following

Furthermore, when the server finds GREASE values in ClientHello:

  1. It must not use these GREASE values for further negotiation and must treat them as ordinary reserved values.
  2. It must ignore these values and use other available values in this field for negotiation.

When processing a ClientHello, servers MUST NOT treat GREASE values differently from any unknown value. Servers MUST NOT negotiate any GREASE value when offered in a ClientHello. Servers MUST correctly ignore unknown values in a ClientHello and attempt to negotiate with one of the remaining parameters. (There may not be any known parameters remaining, in which case parameter negotiation will fail.)

This is a brilliant design, akin to a prisoner’s dilemma. Every TLS library could be used both as a client or as a server. Given that there are several popular TLS libraries, there’s a great chance that the libraries used by the client and the server are different, which forces them to agree on this convention.

Similarly, for parameters initially sent by the server, the RFC stipulates the behavior of the client and server.

Why do we need GREASE?

One might wonder: if we want to reserve these values in the protocol, why don’t we just write them down in the protocol?

This mechanism is based on practical considerations. Although RFCs are well-known and authoritative, they are merely standards made by the community. To be used in practice, they need to be implemented in libraries (such as OpenSSL, BoringSSL, etc.). In this conversion, the logic may deviate from the designer’s intent. This can cause significant problems for foundational network protocols deployed on a large scale, e.g. TLS.

In the case of TLS, if we don’t have GREASE to constrain the implementations, libraries may only support the values in the current protocol and throw exceptions for unknown values.

Although this implementation may work perfectly with the current protocol, when the IETF decides to upgrade it to have more CipherSuites values, libraries of newer protocol may fail to interact with older ones, hindering the deployment of the new protocol.

It’s worth mentioning that GREASE values were initially set for the Version parameter, but it was removed in the final RFC.

The values allocated above are thus no longer available for use as TLS or DTLS [RFC6347] version numbers.

Therefore, in TLSv1.3, they have to use supported_versions extension to indicate the support for TLSv1.3, instead of using Version parameter, so that they can be compliant with TLSv1.2. For more details, see this Cloudflare blog.

Conclusion

In a large-scale system like the Internet, changes are difficult and always take a long time. The deployment and upgrade of foundational network protocols are among these challenges. Many protocols seem straightforward at first glances, but you can learn something when you dive deep.

My name is Qian Wang(pronounced “Chien Wong”), but you can call me Kyle. I’m a software engineer, focusing on developing high-performance deep learning infrastructures.

After earning my bachelor’s degree from Tsinghua University, I spent several years working as a software engineer in Beijing.

Then I moved to the US to take a graduate program at UCLA, from which I will graduate in 2024.